{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\"\n",
    "     align=\"right\"\n",
    "     width=\"30%\"\n",
    "     alt=\"Dask logo\\\">\n",
    "\n",
    "# Distributed - spread your data and computation across a cluster\n",
    "\n",
    "As we covered at the beginning Dask has the ability to run work on multiple machines using the distributed scheduler.\n",
    "\n",
    "Until now we have actually been using the distributed scheduler for our work, but just on a single machine.\n",
    "\n",
    "When we instantiate a `Client()` object with no arguments it will attempt to locate a Dask cluster. It will check your local Dask config and environment variables to see if connection information has been specified. If not it will create an instance of `LocalCluster` and use that.\n",
    "\n",
    "*Specifying connection information in config is useful for system administrators to provide access to their users. We do this in the [Dask Helm Chart for Kubernetes](https://github.com/dask/helm-chart/blob/master/dask/templates/dask-jupyter-deployment.yaml#L46-L48), the chart installs a multi-node Dask cluster and a Jupyter server on a Kubernetes cluster and Jupyter is preconfigured to discover the distributed cluster.*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Local Cluster\n",
    "\n",
    "Let's explore the `LocalCluster` object ourselves and see what it is doing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import LocalCluster, Client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster = LocalCluster()\n",
    "cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.\n",
    "\n",
    "You can also specify these arguments yourself. Let's have a look at the docstring to see the options we have available.\n",
    "\n",
    "*These arguments can also be passed to `Client` and in the case where it creates a `LocalCluster` they will just be passed on down the line.*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "?LocalCluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the `get_logs()` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster.get_logs()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can access the url that the Dask dashboard is being hosted at."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster.dashboard_link"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order for Dask to use our cluster we still need to create a `Client` object, but as we have already created a cluster we can pass that directly to our client."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = Client(cluster)\n",
    "client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "del client, cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Remote clusters via SSH\n",
    "\n",
    "A common way to distribute your work onto multiple machines is via SSH. Dask has a cluster manager which will handle creating SSH connections for you called `SSHCluster`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```python\n",
    "from dask.distributed import SSHCluster\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When constructing this cluster manager we need to pass a list of addresses, either hostnames or IP addresses, which we will SSH into and attempt to start a Dask scheduler or worker on."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```python\n",
    "cluster = SSHCluster([\"localhost\", \"hostA\", \"hostB\"])\n",
    "cluster\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When we create our `SSHCluster` object we have given a list of three hostnames.\n",
    "\n",
    "The first host in the list will be used as the scheduler, all other hosts will be used as workers. If you're on the same network it wouldn't be unreasonable to set your local machine as the scheduler and then use other machines as workers.\n",
    "\n",
    "If your servers are remote to you, in the cloud for instance, you may want the scheduler to be a remote machine too to avoid network bottlenecks."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Scalable clusters\n",
    "\n",
    "Both of the clusters we have seen so far are fixed size clusters. We are either running locally and using all the resources in our machine, or we are using an explicit number of other machines via SSH.\n",
    "\n",
    "With some cluster managers it is possible to increase and descrease the number of workers either by calling `cluster.scale(n)` in your code where `n` is the desired number of workers. Or you can let Dask do this dynamically by calling `cluster.adapt(minimum=1, maximum=100)` where minimum and maximum are your preferred limits for Dask to abide to.\n",
    "\n",
    "It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately.\n",
    "\n",
    "We currently have cluster managers for [Kubernetes](https://kubernetes.dask.org/en/latest/), [Hadoop/Yarn](https://yarn.dask.org/en/latest/), [cloud platforms](https://cloudprovider.dask.org/en/latest/) and [batch systems including PBS, SLURM and SGE](http://jobqueue.dask.org/en/latest/).\n",
    "\n",
    "These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also [Dask Gateway](https://gateway.dask.org/)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Cluster components\n",
    "\n",
    "The minimum requirements for a functioning Dask cluster is a scheduler process and one worker process.\n",
    "\n",
    "We can start these processes manually via the CLI. Let's start with the scheduler.\n",
    "\n",
    "```console\n",
    "$ dask-scheduler                                                               \n",
    "2022-07-07 14:11:35,661 - distributed.scheduler - INFO - -----------------------------------------------\n",
    "2022-07-07 14:11:37,405 - distributed.scheduler - INFO - State start\n",
    "2022-07-07 14:11:37,408 - distributed.scheduler - INFO - -----------------------------------------------\n",
    "2022-07-07 14:11:37,409 - distributed.scheduler - INFO - Clear task state\n",
    "2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   Scheduler at:   tcp://10.51.100.80:8786\n",
    "2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   dashboard at:                     :8787\n",
    "```\n",
    "\n",
    "Then we can connect a worker on the address that the scheduler is listening on.\n",
    "\n",
    "```console\n",
    "$ dask-worker tcp://10.51.100.80:8786 --nworkers=auto\n",
    "2022-07-07 14:12:53,915 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58051'\n",
    "2022-07-07 14:12:53,922 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58052'\n",
    "2022-07-07 14:12:53,924 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58053'\n",
    "2022-07-07 14:12:53,925 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58054'\n",
    "2022-07-07 14:12:55,222 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58065\n",
    "2022-07-07 14:12:55,222 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58065\n",
    "2022-07-07 14:12:55,223 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58068\n",
    "2022-07-07 14:12:55,223 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,223 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,223 - distributed.worker - INFO -               Threads:                          3\n",
    "2022-07-07 14:12:55,223 - distributed.worker - INFO -                Memory:                   4.00 GiB\n",
    "2022-07-07 14:12:55,224 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-hlvac6m5\n",
    "2022-07-07 14:12:55,225 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58066\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58066\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58070\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,227 - distributed.worker - INFO -               Threads:                          3\n",
    "2022-07-07 14:12:55,228 - distributed.worker - INFO -                Memory:                   4.00 GiB\n",
    "2022-07-07 14:12:55,228 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-e1suf_7o\n",
    "2022-07-07 14:12:55,229 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,231 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58063\n",
    "2022-07-07 14:12:55,233 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58063\n",
    "2022-07-07 14:12:55,233 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58067\n",
    "2022-07-07 14:12:55,233 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,233 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,234 - distributed.worker - INFO -               Threads:                          3\n",
    "2022-07-07 14:12:55,234 - distributed.worker - INFO -                Memory:                   4.00 GiB\n",
    "2022-07-07 14:12:55,235 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-oq39ihb4\n",
    "2022-07-07 14:12:55,236 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,246 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,246 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,249 - distributed.core - INFO - Starting established connection\n",
    "2022-07-07 14:12:55,264 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,264 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,267 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,267 - distributed.core - INFO - Starting established connection\n",
    "2022-07-07 14:12:55,267 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,269 - distributed.core - INFO - Starting established connection\n",
    "2022-07-07 14:12:55,273 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58064\n",
    "2022-07-07 14:12:55,273 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58064\n",
    "2022-07-07 14:12:55,273 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58069\n",
    "2022-07-07 14:12:55,273 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,274 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,274 - distributed.worker - INFO -               Threads:                          3\n",
    "2022-07-07 14:12:55,275 - distributed.worker - INFO -                Memory:                   4.00 GiB\n",
    "2022-07-07 14:12:55,275 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-zfie55ku\n",
    "2022-07-07 14:12:55,276 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,299 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786\n",
    "2022-07-07 14:12:55,300 - distributed.worker - INFO - -------------------------------------------------\n",
    "2022-07-07 14:12:55,302 - distributed.core - INFO - Starting established connection\n",
    "```\n",
    "\n",
    "Then in Python we can connect a client to this cluster and submit some work.\n",
    "\n",
    "```python\n",
    ">>> from dask.distributed import Client\n",
    ">>> client = Client(\"tcp://10.51.100.80:8786\")\n",
    ">>> client.submit(lambda: 1+1)\n",
    "```\n",
    "\n",
    "We can also do this in Python by importing cluster components and creating them directly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Scheduler, Worker, Client\n",
    "\n",
    "async with Scheduler() as scheduler:\n",
    "    async with Worker(scheduler.address) as worker:\n",
    "        async with Client(scheduler.address, asynchronous=True) as client:\n",
    "            print(await client.submit(lambda: 1 + 1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Most of the time we never have to create these components ourselves and instead can rely on cluster manager objects to do this for us. But in some situations it can be useful to be able to contruct a cluster yourself manually.\n",
    "\n",
    "You may also see a `Nanny` process being referenced from time to time. This is a wrapper for the worker that handles restarting the process if it is killed. When we run `dask-worker` via the CLI a nanny is automatically created for us."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Cluster networking\n",
    "\n",
    "By default Dask uses a custom TCP based remote procedure call protocol to communicate between processes. The scheduler and workers all listen on TCP ports for communication.\n",
    "\n",
    "When you start a scheduler it typically listens on port `8786`. When a worker is created it listens on a random high port and communicates that port to the scheduler when it first connects.\n",
    "\n",
    "The scheduler maintains a list of all workers and their address which can be accessed via the workers, therefore both the scheduler and any of the workers can open connections to any other worker at any time. Connections are closed automatically when not in use.\n",
    "\n",
    "The `Client` will only ever connect to the scheduler and all communication to the workers will pass through it. This means that when deploying Dask clusters the scheduler and workers must typically be on the same network and able to access each other via IP and port directly. But the client can run wherever as long as it can access the scheduler communication port. It is common to configure firewall rules or load balancers to provide access to just the scheduler port.\n",
    "\n",
    "Dask also supports other network protocols such as [TLS](https://distributed.dask.org/en/stable/tls.html), [websockets](https://distributed.dask.org/en/stable/protocol.html) and [UCX](https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx.html)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TLS/SSL for secure communication\n",
    "\n",
    "Dask cluster components can use certificates to mutually authenticate and communicate securely if run in an untrusted envronment. You can either generate certificates for the scheduler, worker and client automatically and distribute those or you can generate temporary credentials. \n",
    "\n",
    "Some cluster managers such as `dask-cloudprovider` will automatically enable TLS and generate one-time certificates when exposing clusters to the internet from the public cloud."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Scheduler, Worker, Client\n",
    "from distributed.security import Security\n",
    "\n",
    "security = Security.temporary()\n",
    "\n",
    "async with Scheduler(security=security) as scheduler:\n",
    "    async with Worker(scheduler.address, security=security) as worker:\n",
    "        async with Client(\n",
    "            scheduler.address, security=security, asynchronous=True\n",
    "        ) as client:\n",
    "            print(await client.submit(lambda: 1 + 1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Websockets\n",
    "\n",
    "Dask can also communicate via websockets instead of TCP. There is a very small performance overhead to doing this but it means that the dashboard and communication happen on the same port and can be reverse proxied by a layer 7 proxy like nginx. This is necessary for some deployment scenarios where you cannot exprt ports but you can proxy web services."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### UCX\n",
    "\n",
    "On systems with high performance networking such as Infiniband or NVLink Dask can also leverage [UCX](https://openucx.org/) which provides a unified communication protocol that automatically upgrades communication to use the fastest hardware available. This is vital for good performance on HPC systems with Infiniband or systems with multiple GPU workers."
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "interpreter": {
   "hash": "b7cfefc49ba37c014f46ba4939b08cd2bb5b25f27a21eb8109ce6441079573de"
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
